package com.smp.support;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author SmallPlume
 * @title: ScheduledMessageConsumer
 * @projectName Smpcloud-release
 * @description: TODO
 * @date 2020/3/23 23:38
 */
@Slf4j
@Component
public class ScheduledMessageConsumer {

    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;

    public ScheduledMessageConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("ExampleConsumer");

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("ScheduledMessageConsumer");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // Subscribe topics
        consumer.subscribe("schedule_message_test_topic", "*");
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    try {
                        String body = new String(message.getBody(), "utf-8");
                        System.out.println("[" + body + "]Receive message[msgId=" + message.getMsgId() + "] "
                                + System.currentTimeMillis());
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Launch consumer
        consumer.start();
    }
}
